package com.ruyuan.eshop.promotion.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformPromotionMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionUserBucketMessage;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.membership.domain.dto.MembershipAccountDTO;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.promotion.mq.producer.DefaultProducer;
import com.ruyuan.eshop.promotion.mq.splitter.ListSplitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormPromotionUserBucketListener implements MessageListenerConcurrently {

    /**
     * 账户服务
     */
    @DubboReference(version = "1.0.0")
    private AccountApi accountApi;

    /**
     * 发送消息共用的线程池
     */
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;

    /**
     * rocketmq生产者
     */
    @Autowired
    private DefaultProducer defaultProducer;

    /**
     * 并发消费消息
     *
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {

        try {
            for (MessageExt messageExt : msgList) {
                // 1、获取到一个推送任务分片
                String message = new String(messageExt.getBody());
                log.info("执行平台发送促销活动用户桶消息逻辑，消息内容：{}", message);
                PlatformPromotionUserBucketMessage promotionPushTask = JSON.parseObject(message, PlatformPromotionUserBucketMessage.class);

                // 2、查询本次推送任务分片对应的用户群体
                Long startUserId = promotionPushTask.getStartUserId();
                Long endUserId = promotionPushTask.getEndUserId();

                JsonResult<List<MembershipAccountDTO>> queryResult = accountApi.queryAccountByIdRange(startUserId, endUserId);
                if (!queryResult.getSuccess()) {
                    throw new BaseBizException(queryResult.getErrorCode(), queryResult.getErrorMessage());
                }

                List<MembershipAccountDTO> membershipAccounts = queryResult.getData();
                if (CollectionUtils.isEmpty(membershipAccounts)) {
                    log.info("根据用户桶内的id范围没有查询到用户, startUserId={}, endUserId{}", startUserId, endUserId);
                    continue;
                }

                // 3、为每个用户创建一个符合推送系统的规定格式，用户推送消息
                // 再把每个用户推送消息，写到mq里去
                // 第一种，用我们的线程池，并发的把一个任务分片里的1000个消息，并发写到mq里去
                // 一个分片任务有1000个用户的话，此时虽然是多线程并发，但是还是要写1000次mq

                PlatformPromotionMessage promotionMessage = PlatformPromotionMessage.builder()
                        .promotionId(promotionPushTask.getPromotionId())
                        .promotionType(promotionPushTask.getPromotionType())
                        .mainMessage(promotionPushTask.getMainMessage())
                        .message("您已获得活动资格，打开APP进入活动页面")
                        .informType(promotionPushTask.getInformType())
                        .build();

                // 线程池 + mq批量发送消息，
                // rocketmq官网对批量发送消息的说明是，一个batch不能超过1mb，在rocketmq源码中实际上批量消息不能超过4MB
                // 所以批量发送的时候，需要综合考虑发送消息的大小，
                // 再根据网络压力和IO压力综合对比评估之后选择每批次发送多少条
                // 此处我们按照100条一批发送，1000条用户推送消息，会合并为10个batch去做一个推送
                // 只要去发起10次网络请求就可以了，每个任务分片的处理到写mq，速度是非常快的
                // 1w个分片任务，每个任务要写10次mq，10w次，每次10ms，100 0000ms，1000s，20分钟左右
                // 单线程不停的发送，才是这样子算的，我们是多台营销系统的机器，多机器，对每个分片任务的10个batch都是线程池并发写的
                // 2台机器->50个线程，1000s / 50 = 几十s，就可以把1w个分片任务再这里处理完毕，以及写完，对用户系统的查询耗时，也差不多了
                List<String> messages = new ArrayList<>();
                for (MembershipAccountDTO account : membershipAccounts) {
                    promotionMessage.setUserAccountId(account.getId());
                    messages.add(JSON.toJSONString(promotionMessage));
                }
                log.info("本次推送消息数量,{}",messages.size());
                ListSplitter splitter = new ListSplitter(messages,MESSAGE_BATCH_SIZE);
                while(splitter.hasNext()){
                    List<String> sendBatch = splitter.next();
                    log.info("本次批次消息数量,{}",sendBatch.size());
                    sharedSendMsgThreadPool.execute(() -> {
                        defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, sendBatch, "平台优惠活动消息",10000);
                    });
                }
            }
        } catch (Exception e) {
            log.error("consume error,促销活动消息消费失败", e);
            // 本次消费失败，下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
